import os
import pickle
from abc import abstractmethod

from dagster import IOManager, check, io_manager
from dagster.config import Field
from dagster.config.source import StringSource
from dagster.utils import PICKLE_PROTOCOL, mkdir_p
from dagster.utils.backcompat import experimental


class MemoizableIOManager(IOManager):
    """
    Base class for IO manager enabled to work with memoized execution. Users should implement
    the ``load_input`` and ``handle_output`` methods described in the ``IOManager`` API, and the
    ``has_output`` method, which returns a boolean representing whether a data object can be found.
    """

    @abstractmethod
    def has_output(self, context):
        """The user-defined method that returns whether data exists given the metadata.

        Args:
            context (OutputContext): The context of the step performing this check.

        Returns:
            bool: True if there is data present that matches the provided context. False otherwise.
        """


class VersionedPickledObjectFilesystemIOManager(MemoizableIOManager):
    def __init__(self, base_dir=None):
        self.base_dir = check.opt_str_param(base_dir, "base_dir")
        self.write_mode = "wb"
        self.read_mode = "rb"

    def _get_path(self, context):
        # automatically construct filepath
        step_key = check.str_param(context.step_key, "context.step_key")
        output_name = check.str_param(context.name, "context.name")
        version = check.str_param(context.version, "context.version")

        return os.path.join(self.base_dir, step_key, output_name, version)

    def handle_output(self, context, obj):
        """Pickle the data with the associated version, and store the object to a file.

        This method omits the AssetMaterialization event so assets generated by it won't be tracked
        by the Asset Catalog.
        """

        filepath = self._get_path(context)

        # Ensure path exists
        mkdir_p(os.path.dirname(filepath))

        with open(filepath, self.write_mode) as write_obj:
            pickle.dump(obj, write_obj, PICKLE_PROTOCOL)

    def load_input(self, context):
        """Unpickle the file and Load it to a data object."""

        filepath = self._get_path(context.upstream_output)

        with open(filepath, self.read_mode) as read_obj:
            return pickle.load(read_obj)

    def has_output(self, context):
        """Returns true if data object exists with the associated version, False otherwise."""

        filepath = self._get_path(context)

        return os.path.exists(filepath) and not os.path.isdir(filepath)


@io_manager(config_schema={"base_dir": Field(StringSource, is_required=True)})
@experimental
def versioned_filesystem_io_manager(init_context):
    """Filesystem IO manager that utilizes versioning of stored objects.

    It requires users to specify a base directory where all the step outputs will be stored in. It
    serializes and deserializes output values (assets) using pickling and automatically constructs
    the filepaths for the assets using the provided directory, and the version for a provided step
    output.
    """
    return VersionedPickledObjectFilesystemIOManager(
        base_dir=init_context.resource_config.get("base_dir")
    )
